1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package com.google.common.util.concurrent;
18
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
21
22 import com.google.common.annotations.Beta;
23 import com.google.common.annotations.VisibleForTesting;
24 import com.google.common.base.Supplier;
25 import com.google.common.base.Throwables;
26 import com.google.common.collect.Lists;
27 import com.google.common.collect.Queues;
28 import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
29
30 import java.lang.reflect.InvocationTargetException;
31 import java.util.Collection;
32 import java.util.Collections;
33 import java.util.Iterator;
34 import java.util.List;
35 import java.util.concurrent.BlockingQueue;
36 import java.util.concurrent.Callable;
37 import java.util.concurrent.Delayed;
38 import java.util.concurrent.ExecutionException;
39 import java.util.concurrent.Executor;
40 import java.util.concurrent.ExecutorService;
41 import java.util.concurrent.Executors;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.RejectedExecutionException;
44 import java.util.concurrent.ScheduledExecutorService;
45 import java.util.concurrent.ScheduledFuture;
46 import java.util.concurrent.ScheduledThreadPoolExecutor;
47 import java.util.concurrent.ThreadFactory;
48 import java.util.concurrent.ThreadPoolExecutor;
49 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
50 import java.util.concurrent.TimeUnit;
51 import java.util.concurrent.TimeoutException;
52 import java.util.concurrent.locks.Condition;
53 import java.util.concurrent.locks.Lock;
54 import java.util.concurrent.locks.ReentrantLock;
55
56
57
58
59
60
61
62
63
64
65 public final class MoreExecutors {
66 private MoreExecutors() {}
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83 @Beta
84 public static ExecutorService getExitingExecutorService(
85 ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
86 return new Application()
87 .getExitingExecutorService(executor, terminationTimeout, timeUnit);
88 }
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 @Beta
107 public static ScheduledExecutorService getExitingScheduledExecutorService(
108 ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
109 return new Application()
110 .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
111 }
112
113
114
115
116
117
118
119
120
121
122
123
124 @Beta
125 public static void addDelayedShutdownHook(
126 ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
127 new Application()
128 .addDelayedShutdownHook(service, terminationTimeout, timeUnit);
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146 @Beta
147 public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
148 return new Application().getExitingExecutorService(executor);
149 }
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166 @Beta
167 public static ScheduledExecutorService getExitingScheduledExecutorService(
168 ScheduledThreadPoolExecutor executor) {
169 return new Application().getExitingScheduledExecutorService(executor);
170 }
171
172
173 @VisibleForTesting static class Application {
174
175 final ExecutorService getExitingExecutorService(
176 ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
177 useDaemonThreadFactory(executor);
178 ExecutorService service = Executors.unconfigurableExecutorService(executor);
179 addDelayedShutdownHook(service, terminationTimeout, timeUnit);
180 return service;
181 }
182
183 final ScheduledExecutorService getExitingScheduledExecutorService(
184 ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
185 useDaemonThreadFactory(executor);
186 ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
187 addDelayedShutdownHook(service, terminationTimeout, timeUnit);
188 return service;
189 }
190
191 final void addDelayedShutdownHook(
192 final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
193 checkNotNull(service);
194 checkNotNull(timeUnit);
195 addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() {
196 @Override
197 public void run() {
198 try {
199
200
201
202
203
204 service.shutdown();
205 service.awaitTermination(terminationTimeout, timeUnit);
206 } catch (InterruptedException ignored) {
207
208 }
209 }
210 }));
211 }
212
213 final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
214 return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
215 }
216
217 final ScheduledExecutorService getExitingScheduledExecutorService(
218 ScheduledThreadPoolExecutor executor) {
219 return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
220 }
221
222 @VisibleForTesting void addShutdownHook(Thread hook) {
223 Runtime.getRuntime().addShutdownHook(hook);
224 }
225 }
226
227 private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
228 executor.setThreadFactory(new ThreadFactoryBuilder()
229 .setDaemon(true)
230 .setThreadFactory(executor.getThreadFactory())
231 .build());
232 }
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269 @Deprecated public static ListeningExecutorService sameThreadExecutor() {
270 return new DirectExecutorService();
271 }
272
273
274 private static class DirectExecutorService
275 extends AbstractListeningExecutorService {
276
277
278
279
280 private final Lock lock = new ReentrantLock();
281
282
283 private final Condition termination = lock.newCondition();
284
285
286
287
288
289
290
291
292 private int runningTasks = 0;
293 private boolean shutdown = false;
294
295 @Override
296 public void execute(Runnable command) {
297 startTask();
298 try {
299 command.run();
300 } finally {
301 endTask();
302 }
303 }
304
305 @Override
306 public boolean isShutdown() {
307 lock.lock();
308 try {
309 return shutdown;
310 } finally {
311 lock.unlock();
312 }
313 }
314
315 @Override
316 public void shutdown() {
317 lock.lock();
318 try {
319 shutdown = true;
320 } finally {
321 lock.unlock();
322 }
323 }
324
325
326 @Override
327 public List<Runnable> shutdownNow() {
328 shutdown();
329 return Collections.emptyList();
330 }
331
332 @Override
333 public boolean isTerminated() {
334 lock.lock();
335 try {
336 return shutdown && runningTasks == 0;
337 } finally {
338 lock.unlock();
339 }
340 }
341
342 @Override
343 public boolean awaitTermination(long timeout, TimeUnit unit)
344 throws InterruptedException {
345 long nanos = unit.toNanos(timeout);
346 lock.lock();
347 try {
348 for (;;) {
349 if (isTerminated()) {
350 return true;
351 } else if (nanos <= 0) {
352 return false;
353 } else {
354 nanos = termination.awaitNanos(nanos);
355 }
356 }
357 } finally {
358 lock.unlock();
359 }
360 }
361
362
363
364
365
366
367
368
369 private void startTask() {
370 lock.lock();
371 try {
372 if (isShutdown()) {
373 throw new RejectedExecutionException("Executor already shutdown");
374 }
375 runningTasks++;
376 } finally {
377 lock.unlock();
378 }
379 }
380
381
382
383
384 private void endTask() {
385 lock.lock();
386 try {
387 runningTasks--;
388 if (isTerminated()) {
389 termination.signalAll();
390 }
391 } finally {
392 lock.unlock();
393 }
394 }
395 }
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429 public static ListeningExecutorService newDirectExecutorService() {
430 return new DirectExecutorService();
431 }
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449 public static Executor directExecutor() {
450 return DirectExecutor.INSTANCE;
451 }
452
453
454 private enum DirectExecutor implements Executor {
455 INSTANCE;
456 @Override public void execute(Runnable command) {
457 command.run();
458 }
459 }
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479 public static ListeningExecutorService listeningDecorator(
480 ExecutorService delegate) {
481 return (delegate instanceof ListeningExecutorService)
482 ? (ListeningExecutorService) delegate
483 : (delegate instanceof ScheduledExecutorService)
484 ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
485 : new ListeningDecorator(delegate);
486 }
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507 public static ListeningScheduledExecutorService listeningDecorator(
508 ScheduledExecutorService delegate) {
509 return (delegate instanceof ListeningScheduledExecutorService)
510 ? (ListeningScheduledExecutorService) delegate
511 : new ScheduledListeningDecorator(delegate);
512 }
513
514 private static class ListeningDecorator
515 extends AbstractListeningExecutorService {
516 private final ExecutorService delegate;
517
518 ListeningDecorator(ExecutorService delegate) {
519 this.delegate = checkNotNull(delegate);
520 }
521
522 @Override
523 public boolean awaitTermination(long timeout, TimeUnit unit)
524 throws InterruptedException {
525 return delegate.awaitTermination(timeout, unit);
526 }
527
528 @Override
529 public boolean isShutdown() {
530 return delegate.isShutdown();
531 }
532
533 @Override
534 public boolean isTerminated() {
535 return delegate.isTerminated();
536 }
537
538 @Override
539 public void shutdown() {
540 delegate.shutdown();
541 }
542
543 @Override
544 public List<Runnable> shutdownNow() {
545 return delegate.shutdownNow();
546 }
547
548 @Override
549 public void execute(Runnable command) {
550 delegate.execute(command);
551 }
552 }
553
554 private static class ScheduledListeningDecorator
555 extends ListeningDecorator implements ListeningScheduledExecutorService {
556 @SuppressWarnings("hiding")
557 final ScheduledExecutorService delegate;
558
559 ScheduledListeningDecorator(ScheduledExecutorService delegate) {
560 super(delegate);
561 this.delegate = checkNotNull(delegate);
562 }
563
564 @Override
565 public ListenableScheduledFuture<?> schedule(
566 Runnable command, long delay, TimeUnit unit) {
567 ListenableFutureTask<Void> task =
568 ListenableFutureTask.create(command, null);
569 ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
570 return new ListenableScheduledTask<Void>(task, scheduled);
571 }
572
573 @Override
574 public <V> ListenableScheduledFuture<V> schedule(
575 Callable<V> callable, long delay, TimeUnit unit) {
576 ListenableFutureTask<V> task = ListenableFutureTask.create(callable);
577 ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
578 return new ListenableScheduledTask<V>(task, scheduled);
579 }
580
581 @Override
582 public ListenableScheduledFuture<?> scheduleAtFixedRate(
583 Runnable command, long initialDelay, long period, TimeUnit unit) {
584 NeverSuccessfulListenableFutureTask task =
585 new NeverSuccessfulListenableFutureTask(command);
586 ScheduledFuture<?> scheduled =
587 delegate.scheduleAtFixedRate(task, initialDelay, period, unit);
588 return new ListenableScheduledTask<Void>(task, scheduled);
589 }
590
591 @Override
592 public ListenableScheduledFuture<?> scheduleWithFixedDelay(
593 Runnable command, long initialDelay, long delay, TimeUnit unit) {
594 NeverSuccessfulListenableFutureTask task =
595 new NeverSuccessfulListenableFutureTask(command);
596 ScheduledFuture<?> scheduled =
597 delegate.scheduleWithFixedDelay(task, initialDelay, delay, unit);
598 return new ListenableScheduledTask<Void>(task, scheduled);
599 }
600
601 private static final class ListenableScheduledTask<V>
602 extends SimpleForwardingListenableFuture<V>
603 implements ListenableScheduledFuture<V> {
604
605 private final ScheduledFuture<?> scheduledDelegate;
606
607 public ListenableScheduledTask(
608 ListenableFuture<V> listenableDelegate,
609 ScheduledFuture<?> scheduledDelegate) {
610 super(listenableDelegate);
611 this.scheduledDelegate = scheduledDelegate;
612 }
613
614 @Override
615 public boolean cancel(boolean mayInterruptIfRunning) {
616 boolean cancelled = super.cancel(mayInterruptIfRunning);
617 if (cancelled) {
618
619 scheduledDelegate.cancel(mayInterruptIfRunning);
620
621
622 }
623 return cancelled;
624 }
625
626 @Override
627 public long getDelay(TimeUnit unit) {
628 return scheduledDelegate.getDelay(unit);
629 }
630
631 @Override
632 public int compareTo(Delayed other) {
633 return scheduledDelegate.compareTo(other);
634 }
635 }
636
637 private static final class NeverSuccessfulListenableFutureTask
638 extends AbstractFuture<Void>
639 implements Runnable {
640 private final Runnable delegate;
641
642 public NeverSuccessfulListenableFutureTask(Runnable delegate) {
643 this.delegate = checkNotNull(delegate);
644 }
645
646 @Override public void run() {
647 try {
648 delegate.run();
649 } catch (Throwable t) {
650 setException(t);
651 throw Throwables.propagate(t);
652 }
653 }
654 }
655 }
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672 static <T> T invokeAnyImpl(ListeningExecutorService executorService,
673 Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
674 throws InterruptedException, ExecutionException, TimeoutException {
675 checkNotNull(executorService);
676 int ntasks = tasks.size();
677 checkArgument(ntasks > 0);
678 List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
679 BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
680
681
682
683
684
685
686
687 try {
688
689
690 ExecutionException ee = null;
691 long lastTime = timed ? System.nanoTime() : 0;
692 Iterator<? extends Callable<T>> it = tasks.iterator();
693
694 futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
695 --ntasks;
696 int active = 1;
697
698 for (;;) {
699 Future<T> f = futureQueue.poll();
700 if (f == null) {
701 if (ntasks > 0) {
702 --ntasks;
703 futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
704 ++active;
705 } else if (active == 0) {
706 break;
707 } else if (timed) {
708 f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
709 if (f == null) {
710 throw new TimeoutException();
711 }
712 long now = System.nanoTime();
713 nanos -= now - lastTime;
714 lastTime = now;
715 } else {
716 f = futureQueue.take();
717 }
718 }
719 if (f != null) {
720 --active;
721 try {
722 return f.get();
723 } catch (ExecutionException eex) {
724 ee = eex;
725 } catch (RuntimeException rex) {
726 ee = new ExecutionException(rex);
727 }
728 }
729 }
730
731 if (ee == null) {
732 ee = new ExecutionException(null);
733 }
734 throw ee;
735 } finally {
736 for (Future<T> f : futures) {
737 f.cancel(true);
738 }
739 }
740 }
741
742
743
744
745 private static <T> ListenableFuture<T> submitAndAddQueueListener(
746 ListeningExecutorService executorService, Callable<T> task,
747 final BlockingQueue<Future<T>> queue) {
748 final ListenableFuture<T> future = executorService.submit(task);
749 future.addListener(new Runnable() {
750 @Override public void run() {
751 queue.add(future);
752 }
753 }, directExecutor());
754 return future;
755 }
756
757
758
759
760
761
762
763
764
765 @Beta
766 public static ThreadFactory platformThreadFactory() {
767 if (!isAppEngine()) {
768 return Executors.defaultThreadFactory();
769 }
770 try {
771 return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager")
772 .getMethod("currentRequestThreadFactory")
773 .invoke(null);
774 } catch (IllegalAccessException e) {
775 throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
776 } catch (ClassNotFoundException e) {
777 throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
778 } catch (NoSuchMethodException e) {
779 throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
780 } catch (InvocationTargetException e) {
781 throw Throwables.propagate(e.getCause());
782 }
783 }
784
785 private static boolean isAppEngine() {
786 if (System.getProperty("com.google.appengine.runtime.environment") == null) {
787 return false;
788 }
789 try {
790
791 return Class.forName("com.google.apphosting.api.ApiProxy")
792 .getMethod("getCurrentEnvironment")
793 .invoke(null) != null;
794 } catch (ClassNotFoundException e) {
795
796 return false;
797 } catch (InvocationTargetException e) {
798
799 return false;
800 } catch (IllegalAccessException e) {
801
802 return false;
803 } catch (NoSuchMethodException e) {
804
805 return false;
806 }
807 }
808
809
810
811
812
813 static Thread newThread(String name, Runnable runnable) {
814 checkNotNull(name);
815 checkNotNull(runnable);
816 Thread result = platformThreadFactory().newThread(runnable);
817 try {
818 result.setName(name);
819 } catch (SecurityException e) {
820
821 }
822 return result;
823 }
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840 static Executor renamingDecorator(final Executor executor, final Supplier<String> nameSupplier) {
841 checkNotNull(executor);
842 checkNotNull(nameSupplier);
843 if (isAppEngine()) {
844
845 return executor;
846 }
847 return new Executor() {
848 @Override public void execute(Runnable command) {
849 executor.execute(Callables.threadRenaming(command, nameSupplier));
850 }
851 };
852 }
853
854
855
856
857
858
859
860
861
862
863
864
865
866 static ExecutorService renamingDecorator(final ExecutorService service,
867 final Supplier<String> nameSupplier) {
868 checkNotNull(service);
869 checkNotNull(nameSupplier);
870 if (isAppEngine()) {
871
872 return service;
873 }
874 return new WrappingExecutorService(service) {
875 @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
876 return Callables.threadRenaming(callable, nameSupplier);
877 }
878 @Override protected Runnable wrapTask(Runnable command) {
879 return Callables.threadRenaming(command, nameSupplier);
880 }
881 };
882 }
883
884
885
886
887
888
889
890
891
892
893
894
895
896 static ScheduledExecutorService renamingDecorator(final ScheduledExecutorService service,
897 final Supplier<String> nameSupplier) {
898 checkNotNull(service);
899 checkNotNull(nameSupplier);
900 if (isAppEngine()) {
901
902 return service;
903 }
904 return new WrappingScheduledExecutorService(service) {
905 @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
906 return Callables.threadRenaming(callable, nameSupplier);
907 }
908 @Override protected Runnable wrapTask(Runnable command) {
909 return Callables.threadRenaming(command, nameSupplier);
910 }
911 };
912 }
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939 @Beta
940 public static boolean shutdownAndAwaitTermination(
941 ExecutorService service, long timeout, TimeUnit unit) {
942 checkNotNull(unit);
943
944 service.shutdown();
945 try {
946 long halfTimeoutNanos = TimeUnit.NANOSECONDS.convert(timeout, unit) / 2;
947
948 if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
949
950 service.shutdownNow();
951
952 service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
953 }
954 } catch (InterruptedException ie) {
955
956 Thread.currentThread().interrupt();
957
958 service.shutdownNow();
959 }
960 return service.isTerminated();
961 }
962 }